# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
from io import StringIO, TextIOWrapper
from unittest import TestCase, main
from unittest.mock import MagicMock, Mock, patch
from contextlib import ExitStack

from ..ansitowin32 import AnsiToWin32, StreamWrapper
from ..win32 import ENABLE_VIRTUAL_TERMINAL_PROCESSING
from .utils import osname


class StreamWrapperTest(TestCase):

    def testIsAProxy(self):
        mockStream = Mock()
        wrapper = StreamWrapper(mockStream, None)
        self.assertTrue( wrapper.random_attr is mockStream.random_attr )

    def testDelegatesWrite(self):
        mockStream = Mock()
        mockConverter = Mock()
        wrapper = StreamWrapper(mockStream, mockConverter)
        wrapper.write('hello')
        self.assertTrue(mockConverter.write.call_args, (('hello',), {}))

    def testDelegatesContext(self):
        mockConverter = Mock()
        s = StringIO()
        with StreamWrapper(s, mockConverter) as fp:
            fp.write('hello')
        self.assertTrue(s.closed)

    def testProxyNoContextManager(self):
        mockStream = MagicMock()
        mockStream.__enter__.side_effect = AttributeError()
        mockConverter = Mock()
        with self.assertRaises(AttributeError) as excinfo:
            with StreamWrapper(mockStream, mockConverter) as wrapper:
                wrapper.write('hello')

    def test_closed_shouldnt_raise_on_closed_stream(self):
        stream = StringIO()
        stream.close()
        wrapper = StreamWrapper(stream, None)
        self.assertEqual(wrapper.closed, True)

    def test_closed_shouldnt_raise_on_detached_stream(self):
        stream = TextIOWrapper(StringIO())
        stream.detach()
        wrapper = StreamWrapper(stream, None)
        self.assertEqual(wrapper.closed, True)

class AnsiToWin32Test(TestCase):

    def testInit(self):
        mockStdout = Mock()
        auto = Mock()
        stream = AnsiToWin32(mockStdout, autoreset=auto)
        self.assertEqual(stream.wrapped, mockStdout)
        self.assertEqual(stream.autoreset, auto)

    @patch('colorama.ansitowin32.winterm', None)
    @patch('colorama.ansitowin32.winapi_test', lambda *_: True)
    def testStripIsTrueOnWindows(self):
        with osname('nt'):
            mockStdout = Mock()
            stream = AnsiToWin32(mockStdout)
            self.assertTrue(stream.strip)

    def testStripIsFalseOffWindows(self):
        with osname('posix'):
            mockStdout = Mock(closed=False)
            stream = AnsiToWin32(mockStdout)
            self.assertFalse(stream.strip)

    def testWriteStripsAnsi(self):
        mockStdout = Mock()
        stream = AnsiToWin32(mockStdout)
        stream.wrapped = Mock()
        stream.write_and_convert = Mock()
        stream.strip = True

        stream.write('abc')

        self.assertFalse(stream.wrapped.write.called)
        self.assertEqual(stream.write_and_convert.call_args, (('abc',), {}))

    def testWriteDoesNotStripAnsi(self):
        mockStdout = Mock()
        stream = AnsiToWin32(mockStdout)
        stream.wrapped = Mock()
        stream.write_and_convert = Mock()
        stream.strip = False
        stream.convert = False

        stream.write('abc')

        self.assertFalse(stream.write_and_convert.called)
        self.assertEqual(stream.wrapped.write.call_args, (('abc',), {}))

    def assert_autoresets(self, convert, autoreset=True):
        stream = AnsiToWin32(Mock())
        stream.convert = convert
        stream.reset_all = Mock()
        stream.autoreset = autoreset
        stream.winterm = Mock()

        stream.write('abc')

        self.assertEqual(stream.reset_all.called, autoreset)

    def testWriteAutoresets(self):
        self.assert_autoresets(convert=True)
        self.assert_autoresets(convert=False)
        self.assert_autoresets(convert=True, autoreset=False)
        self.assert_autoresets(convert=False, autoreset=False)

    def testWriteAndConvertWritesPlainText(self):
        stream = AnsiToWin32(Mock())
        stream.write_and_convert( 'abc' )
        self.assertEqual( stream.wrapped.write.call_args, (('abc',), {}) )

    def testWriteAndConvertStripsAllValidAnsi(self):
        stream = AnsiToWin32(Mock())
        stream.call_win32 = Mock()
        data = [
            'abc\033[mdef',
            'abc\033[0mdef',
            'abc\033[2mdef',
            'abc\033[02mdef',
            'abc\033[002mdef',
            'abc\033[40mdef',
            'abc\033[040mdef',
            'abc\033[0;1mdef',
            'abc\033[40;50mdef',
            'abc\033[50;30;40mdef',
            'abc\033[Adef',
            'abc\033[0Gdef',
            'abc\033[1;20;128Hdef',
        ]
        for datum in data:
            stream.wrapped.write.reset_mock()
            stream.write_and_convert( datum )
            self.assertEqual(
               [args[0] for args in stream.wrapped.write.call_args_list],
               [ ('abc',), ('def',) ]
            )

    def testWriteAndConvertSkipsEmptySnippets(self):
        stream = AnsiToWin32(Mock())
        stream.call_win32 = Mock()
        stream.write_and_convert( '\033[40m\033[41m' )
        self.assertFalse( stream.wrapped.write.called )

    def testWriteAndConvertCallsWin32WithParamsAndCommand(self):
        stream = AnsiToWin32(Mock())
        stream.convert = True
        stream.call_win32 = Mock()
        stream.extract_params = Mock(return_value='params')
        data = {
            'abc\033[adef':         ('a', 'params'),
            'abc\033[;;bdef':       ('b', 'params'),
            'abc\033[0cdef':        ('c', 'params'),
            'abc\033[;;0;;Gdef':    ('G', 'params'),
            'abc\033[1;20;128Hdef': ('H', 'params'),
        }
        for datum, expected in data.items():
            stream.call_win32.reset_mock()
            stream.write_and_convert( datum )
            self.assertEqual( stream.call_win32.call_args[0], expected )

    def test_reset_all_shouldnt_raise_on_closed_orig_stdout(self):
        stream = StringIO()
        converter = AnsiToWin32(stream)
        stream.close()

        converter.reset_all()

    def test_wrap_shouldnt_raise_on_closed_orig_stdout(self):
        stream = StringIO()
        stream.close()
        with \
            patch("colorama.ansitowin32.os.name", "nt"), \
            patch("colorama.ansitowin32.winapi_test", lambda: True):
                converter = AnsiToWin32(stream)
        self.assertTrue(converter.strip)
        self.assertFalse(converter.convert)

    def test_wrap_shouldnt_raise_on_missing_closed_attr(self):
        with \
            patch("colorama.ansitowin32.os.name", "nt"), \
            patch("colorama.ansitowin32.winapi_test", lambda: True):
                converter = AnsiToWin32(object())
        self.assertTrue(converter.strip)
        self.assertFalse(converter.convert)

    def testExtractParams(self):
        stream = AnsiToWin32(Mock())
        data = {
            '':               (0,),
            ';;':             (0,),
            '2':              (2,),
            ';;002;;':        (2,),
            '0;1':            (0, 1),
            ';;003;;456;;':   (3, 456),
            '11;22;33;44;55': (11, 22, 33, 44, 55),
        }
        for datum, expected in data.items():
            self.assertEqual(stream.extract_params('m', datum), expected)

    def testCallWin32UsesLookup(self):
        listener = Mock()
        stream = AnsiToWin32(listener)
        stream.win32_calls = {
            1: (lambda *_, **__: listener(11),),
            2: (lambda *_, **__: listener(22),),
            3: (lambda *_, **__: listener(33),),
        }
        stream.call_win32('m', (3, 1, 99, 2))
        self.assertEqual(
            [a[0][0] for a in listener.call_args_list],
            [33, 11, 22] )

    def test_osc_codes(self):
        mockStdout = Mock()
        stream = AnsiToWin32(mockStdout, convert=True)
        with patch('colorama.ansitowin32.winterm') as winterm:
            data = [
                '\033]0\x07',                      # missing arguments
                '\033]0;foo\x08',                  # wrong OSC command
                '\033]0;colorama_test_title\x07',  # should work
                '\033]1;colorama_test_title\x07',  # wrong set command
                '\033]2;colorama_test_title\x07',  # should work
                '\033]' + ';' * 64 + '\x08',       # see issue #247
            ]
            for code in data:
                stream.write(code)
            self.assertEqual(winterm.set_title.call_count, 2)

    def test_native_windows_ansi(self):
        with ExitStack() as stack:
            def p(a, b):
                stack.enter_context(patch(a, b, create=True))
            # Pretend to be on Windows
            p("colorama.ansitowin32.os.name", "nt")
            p("colorama.ansitowin32.winapi_test", lambda: True)
            p("colorama.win32.winapi_test", lambda: True)
            p("colorama.winterm.win32.windll", "non-None")
            p("colorama.winterm.get_osfhandle", lambda _: 1234)

            # Pretend that our mock stream has native ANSI support
            p(
                "colorama.winterm.win32.GetConsoleMode",
                lambda _: ENABLE_VIRTUAL_TERMINAL_PROCESSING,
            )
            SetConsoleMode = Mock()
            p("colorama.winterm.win32.SetConsoleMode", SetConsoleMode)

            stdout = Mock()
            stdout.closed = False
            stdout.isatty.return_value = True
            stdout.fileno.return_value = 1

            # Our fake console says it has native vt support, so AnsiToWin32 should
            # enable that support and do nothing else.
            stream = AnsiToWin32(stdout)
            SetConsoleMode.assert_called_with(1234, ENABLE_VIRTUAL_TERMINAL_PROCESSING)
            self.assertFalse(stream.strip)
            self.assertFalse(stream.convert)
            self.assertFalse(stream.should_wrap())

            # Now let's pretend we're on an old Windows console, that doesn't have
            # native ANSI support.
            p("colorama.winterm.win32.GetConsoleMode", lambda _: 0)
            SetConsoleMode = Mock()
            p("colorama.winterm.win32.SetConsoleMode", SetConsoleMode)

            stream = AnsiToWin32(stdout)
            SetConsoleMode.assert_called_with(1234, ENABLE_VIRTUAL_TERMINAL_PROCESSING)
            self.assertTrue(stream.strip)
            self.assertTrue(stream.convert)
            self.assertTrue(stream.should_wrap())


if __name__ == '__main__':
    main()
